View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: JmsSession.java,v 1.6 2007/01/24 12:00:28 tanderson Exp $
44   */
45  package org.exolab.jms.client;
46  
47  import java.io.Serializable;
48  import java.util.ArrayList;
49  import java.util.HashMap;
50  import java.util.List;
51  import java.util.Vector;
52  import javax.jms.BytesMessage;
53  import javax.jms.Connection;
54  import javax.jms.Destination;
55  import javax.jms.IllegalStateException;
56  import javax.jms.InvalidDestinationException;
57  import javax.jms.InvalidSelectorException;
58  import javax.jms.JMSException;
59  import javax.jms.MapMessage;
60  import javax.jms.Message;
61  import javax.jms.MessageConsumer;
62  import javax.jms.MessageListener;
63  import javax.jms.MessageProducer;
64  import javax.jms.ObjectMessage;
65  import javax.jms.Queue;
66  import javax.jms.QueueBrowser;
67  import javax.jms.Session;
68  import javax.jms.StreamMessage;
69  import javax.jms.TemporaryQueue;
70  import javax.jms.TemporaryTopic;
71  import javax.jms.TextMessage;
72  import javax.jms.Topic;
73  import javax.jms.TopicSubscriber;
74  
75  import org.apache.commons.logging.Log;
76  import org.apache.commons.logging.LogFactory;
77  
78  import org.exolab.jms.message.BytesMessageImpl;
79  import org.exolab.jms.message.MapMessageImpl;
80  import org.exolab.jms.message.MessageConverter;
81  import org.exolab.jms.message.MessageConverterFactory;
82  import org.exolab.jms.message.MessageImpl;
83  import org.exolab.jms.message.MessageSessionIfc;
84  import org.exolab.jms.message.ObjectMessageImpl;
85  import org.exolab.jms.message.StreamMessageImpl;
86  import org.exolab.jms.message.TextMessageImpl;
87  import org.exolab.jms.server.ServerSession;
88  
89  
90  /***
91   * Client implementation of the <code>javax.jms.Session</code> interface.
92   *
93   * @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
94   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
95   * @version $Revision: 1.6 $ $Date: 2007/01/24 12:00:28 $
96   */
97  class JmsSession implements Session, JmsMessageListener, MessageSessionIfc {
98  
99      /***
100      * The owner of the session.
101      */
102     private JmsConnection _connection;
103 
104     /***
105      * The proxy to the remote session implementation.
106      */
107     private ServerSession _session = null;
108 
109     /***
110      * If true, indicates that the session has been closed.
111      */
112     private volatile boolean _closed = false;
113 
114     /***
115      * Determines if this session is being closed.
116      */
117     private boolean _closing = false;
118 
119     /***
120      * Synchronization helper, used during close().
121      */
122     private final Object _closeLock = new Object();
123 
124     /***
125      * This flag determines whether message delivery is enabled or disabled.
126      * Message delivery if disabled if the enclosing connection is stopped.
127      */
128     private boolean _stopped = true;
129 
130     /***
131      * Indicates whether the consumer or the client will acknowledge any
132      * messages it receives. Ignored if the session is transacted. Legal values
133      * are <code>Session.AUTO_ACKNOWLEDGE</code>, <code>Session.CLIENT_ACKNOWLEDGE</code>
134      * and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
135      */
136     private final int _ackMode;
137 
138     /***
139      * Maintains the a map of JmsMessageConsumer.getConsumerId() ->
140      * JmsMessageConsumer objects.
141      */
142     private HashMap _consumers = new HashMap();
143 
144     /***
145      * Maintains a list of producers for the session.
146      */
147     private List _producers = new ArrayList();
148 
149     /***
150      * Maintain a collection of acked messages for a transacted session. These
151      * messages are only sent to the server on commit.
152      */
153     private List _messagesToSend = new ArrayList();
154 
155     /***
156      * This is the session's session listener which is used to receive all
157      * messages associated with all consumers registered with this session.
158      */
159     private MessageListener _listener = null;
160 
161     /***
162      * The message cache holds all messages for the session, allocated by a
163      * JmsConnectionConsumer.
164      */
165     private Vector _messageCache = new Vector();
166 
167     /***
168      * Monitor used to block consumers, if the session has been stopped, or no
169      * messages are available.
170      */
171     private final Object _receiveLock = new Object();
172 
173     /***
174      * The identitifier of the consumer performing a blocking receive, or
175      * <code>-1</code> if no consumer is currently performing a blocking
176      * receive.
177      */
178     private long _blockingConsumer = -1;
179 
180     /***
181      * The logger.
182      */
183     private static final Log _log = LogFactory.getLog(JmsSession.class);
184 
185 
186     /***
187      * Construct a new <code>JmsSession</code>
188      *
189      * @param connection the owner of the session
190      * @param transacted if <code>true</code>, the session is transacted.
191      * @param ackMode    indicates whether the consumer or the client will
192      *                   acknowledge any messages it receives. This parameter
193      *                   will be ignored if the session is transacted. Legal
194      *                   values are <code>Session.AUTO_ACKNOWLEDGE</code>,
195      *                   <code>Session.CLIENT_ACKNOWLEDGE</code> and
196      *                   <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
197      * @throws JMSException if the session cannot be created
198      */
199     public JmsSession(JmsConnection connection, boolean transacted,
200                       int ackMode) throws JMSException {
201         if (connection == null) {
202             throw new IllegalArgumentException("Argument 'connection' is null");
203         }
204 
205         _connection = connection;
206         _ackMode = (transacted) ? SESSION_TRANSACTED : ackMode;
207 
208         // construct the remote stub
209         _session = connection.getServerConnection().createSession(_ackMode,
210                                                                   transacted);
211 
212         // set up this instance to be a message listener
213         _session.setMessageListener(this);
214 
215         // now we need to check whether we should start the session
216         if (!connection.isStopped()) {
217             start();
218         }
219     }
220 
221     /***
222      * Creates a <code>BytesMessage</code> object. A <code>BytesMessage</code>
223      * object is used to send a message containing a stream of uninterpreted
224      * bytes.
225      *
226      * @throws JMSException if the JMS provider fails to create this message due
227      *                      to some internal error.
228      */
229     public BytesMessage createBytesMessage() throws JMSException {
230         ensureOpen();
231         return new BytesMessageImpl();
232     }
233 
234     /***
235      * Creates a <code>MapMessage</code> object. A <code>MapMessage</code>
236      * object is used to send a self-defining set of name-value pairs, where
237      * names are <code>String</code> objects and values are primitive values in
238      * the Java programming language.
239      *
240      * @throws JMSException if the JMS provider fails to create this message due
241      *                      to some internal error.
242      */
243     public MapMessage createMapMessage() throws JMSException {
244         ensureOpen();
245         return new MapMessageImpl();
246     }
247 
248     /***
249      * Creates a <code>Message</code> object. The <code>Message</code> interface
250      * is the root interface of all JMS messages. A <code>Message</code> object
251      * holds all the standard message header information. It can be sent when a
252      * message containing only header information is sufficient.
253      *
254      * @throws JMSException if the JMS provider fails to create this message due
255      *                      to some internal error.
256      */
257     public Message createMessage() throws JMSException {
258         ensureOpen();
259         return new MessageImpl();
260     }
261 
262     /***
263      * Creates an <code>ObjectMessage</code> object. An <code>ObjectMessage</code>
264      * object is used to send a message that contains a serializable Java
265      * object.
266      *
267      * @throws JMSException if the JMS provider fails to create this message due
268      *                      to some internal error.
269      */
270     public ObjectMessage createObjectMessage() throws JMSException {
271         ensureOpen();
272         return new ObjectMessageImpl();
273     }
274 
275     /***
276      * Creates an initialized <code>ObjectMessage</code> object. An
277      * <code>ObjectMessage</code> object is used to send a message that contains
278      * a serializable Java object.
279      *
280      * @param object the object to use to initialize this message
281      * @throws JMSException if the JMS provider fails to create this message due
282      *                      to some internal error.
283      */
284     public ObjectMessage createObjectMessage(Serializable object)
285             throws JMSException {
286         ensureOpen();
287         ObjectMessageImpl result = new ObjectMessageImpl();
288         result.setObject(object);
289         return result;
290     }
291 
292     /***
293      * Creates a <code>StreamMessage</code> object. A <code>StreamMessage</code>
294      * object is used to send a self-defining stream of primitive values in the
295      * Java programming language.
296      *
297      * @throws JMSException if the JMS provider fails to create this message due
298      *                      to some internal error.
299      */
300     public StreamMessage createStreamMessage() throws JMSException {
301         ensureOpen();
302         return new StreamMessageImpl();
303     }
304 
305     /***
306      * Creates a <code>TextMessage</code> object. A <code>TextMessage</code>
307      * object is used to send a message containing a <code>String</code>
308      * object.
309      *
310      * @throws JMSException if the JMS provider fails to create this message due
311      *                      to some internal error.
312      */
313     public TextMessage createTextMessage() throws JMSException {
314         ensureOpen();
315         return new TextMessageImpl();
316     }
317 
318     /***
319      * Creates an initialized <code>TextMessage</code> object. A
320      * <code>TextMessage</code> object is used to send a message containing a
321      * <code>String</code>.
322      *
323      * @param text the string used to initialize this message
324      * @throws JMSException if the JMS provider fails to create this message due
325      *                      to some internal error.
326      */
327     public TextMessage createTextMessage(String text) throws JMSException {
328         ensureOpen();
329         TextMessageImpl result = new TextMessageImpl();
330         result.setText(text);
331         return result;
332     }
333 
334     /***
335      * Determines if the session is transacted.
336      *
337      * @return <code>true</code> if the session is transacted
338      * @throws JMSException if the session is closed
339      */
340     public boolean getTransacted() throws JMSException {
341         ensureOpen();
342         return (_ackMode == SESSION_TRANSACTED);
343     }
344 
345     /***
346      * Returns the acknowledgement mode of the session. The acknowledgement mode
347      * is set at the time that the session is created. If the session is
348      * transacted, the acknowledgement mode is ignored.
349      *
350      * @return If the session is not transacted, returns the current
351      *         acknowledgement mode for the session. If the session is
352      *         transacted, returns SESSION_TRANSACTED.
353      * @throws JMSException if the JMS provider fails to return the
354      *                      acknowledgment mode due to some internal error.
355      * @see Connection#createSession
356      */
357     public int getAcknowledgeMode() throws JMSException {
358         ensureOpen();
359         return _ackMode;
360     }
361 
362     /***
363      * Creates a <code>MessageProducer</code> to send messages to the specified
364      * destination.
365      *
366      * @param destination the <code>Destination</code> to send to, or null if
367      *                    this is a producer which does not have a specified
368      *                    destination.
369      * @throws JMSException                if the session fails to create a
370      *                                     MessageProducer due to some internal
371      *                                     error.
372      * @throws InvalidDestinationException if an invalid destination is
373      *                                     specified.
374      */
375     public MessageProducer createProducer(Destination destination)
376             throws JMSException {
377         ensureOpen();
378         return new JmsMessageProducer(this, destination);
379     }
380 
381     /***
382      * Creates a <code>MessageConsumer</code> for the specified destination.
383      *
384      * @param destination the <code>Destination</code> to access.
385      * @throws JMSException                if the session fails to create a
386      *                                     consumer due to some internal error.
387      * @throws InvalidDestinationException if an invalid destination is
388      *                                     specified.
389      */
390 
391     public MessageConsumer createConsumer(Destination destination)
392             throws JMSException {
393         return createConsumer(destination, null);
394     }
395 
396     /***
397      * Creates a <code>MessageProducer</code> to receive messages from the
398      * specified destination, matching particular selection criteria
399      *
400      * @param destination     the <code>Destination</code> to access
401      * @param messageSelector only messages with properties matching the message
402      *                        selector expression are delivered. A value of null
403      *                        or an empty string indicates that there is no
404      *                        message selector for the message consumer.
405      * @throws JMSException                if the session fails to create a
406      *                                     MessageConsumer due to some internal
407      *                                     error.
408      * @throws InvalidDestinationException if an invalid destination is
409      *                                     specified.
410      * @throws InvalidSelectorException    if the message selector is invalid.
411      */
412     public MessageConsumer createConsumer(Destination destination,
413                                           String messageSelector)
414             throws JMSException {
415         return createConsumer(destination, messageSelector, false);
416     }
417 
418     /***
419      * Creates a <code>MessageConsumer</code> to receive messages from the
420      * specified destination, matching particular selection criteria. This
421      * method can specify whether messages published by its own connection
422      * should be delivered to it, if the destination is a topic. <P>In some
423      * cases, a connection may both publish and subscribe to a topic. The
424      * consumer <code>noLocal</code> attribute allows a consumer to inhibit the
425      * delivery of messages published by its own connection. The default value
426      * for this attribute is false. The <code>noLocal</code> value must be
427      * supported by destinations that are topics.
428      *
429      * @param destination     the <code>Destination</code> to access
430      * @param messageSelector only messages with properties matching the message
431      *                        selector expression are delivered. A value of null
432      *                        or an empty string indicates that there is no
433      *                        message selector for the message consumer.
434      * @param noLocal         if true, and the destination is a topic, inhibits
435      *                        the delivery of messages published by its own
436      *                        connection.  The behavior for <code>noLocal</code>
437      *                        is not specified if the destination is a queue.
438      * @throws JMSException                if the session fails to create a
439      *                                     MessageConsumer due to some internal
440      *                                     error.
441      * @throws InvalidDestinationException if an invalid destination is
442      *                                     specified.
443      * @throws InvalidSelectorException    if the message selector is invalid.
444      */
445     public MessageConsumer createConsumer(Destination destination,
446                                           String messageSelector,
447                                           boolean noLocal) throws JMSException {
448         long consumerId = allocateConsumer(destination, messageSelector,
449                                            noLocal);
450         JmsMessageConsumer consumer = new JmsMessageConsumer(this, consumerId,
451                                                              destination,
452                                                              messageSelector);
453         addConsumer(consumer);
454         return consumer;
455     }
456 
457     /***
458      * Creates a queue identity given a <code>Queue</code> name.
459      * <p/>
460      * <P>This facility is provided for the rare cases where clients need to
461      * dynamically manipulate queue identity. It allows the creation of a queue
462      * identity with a provider-specific name. Clients that depend on this
463      * ability are not portable.
464      * <p/>
465      * <P>Note that this method is not for creating the physical queue. The
466      * physical creation of queues is an administrative task and is not to be
467      * initiated by the JMS API. The one exception is the creation of temporary
468      * queues, which is accomplished with the <code>createTemporaryQueue</code>
469      * method.
470      *
471      * @param queueName the name of this <code>Queue</code>
472      * @return a <code>Queue</code> with the given name
473      * @throws JMSException if the session fails to create a queue due to some
474      *                      internal error.
475      */
476     public Queue createQueue(String queueName) throws JMSException {
477         ensureOpen();
478 
479         JmsQueue queue;
480 
481         if (queueName != null && queueName.length() > 0) {
482             queue = new JmsQueue(queueName);
483         } else {
484             throw new JMSException(
485                     "Cannot create a queue with null or empty name");
486         }
487 
488         return queue;
489     }
490 
491     /***
492      * Creates a topic identity given a <code>Topic</code> name.
493      * <p/>
494      * <P>This facility is provided for the rare cases where clients need to
495      * dynamically manipulate topic identity. This allows the creation of a
496      * topic identity with a provider-specific name. Clients that depend on this
497      * ability are not portable.
498      * <p/>
499      * <P>Note that this method is not for creating the physical topic. The
500      * physical creation of topics is an administrative task and is not to be
501      * initiated by the JMS API. The one exception is the creation of temporary
502      * topics, which is accomplished with the <code>createTemporaryTopic</code>
503      * method.
504      *
505      * @param topicName the name of this <code>Topic</code>
506      * @return a <code>Topic</code> with the given name
507      * @throws JMSException if the session fails to create a topic due to some
508      *                      internal error.
509      */
510     public Topic createTopic(String topicName) throws JMSException {
511         ensureOpen();
512 
513         JmsTopic topic;
514 
515         if (topicName != null && topicName.length() > 0) {
516             topic = new JmsTopic(topicName);
517         } else {
518             throw new JMSException("Invalid or null topic name specified");
519         }
520 
521         return topic;
522     }
523 
524     /***
525      * Creates a durable subscriber to the specified topic.
526      * <p/>
527      * <P>If a client needs to receive all the messages published on a topic,
528      * including the ones published while the subscriber is inactive, it uses a
529      * durable <code>TopicSubscriber</code>. The JMS provider retains a record
530      * of this durable subscription and insures that all messages from the
531      * topic's publishers are retained until they are acknowledged by this
532      * durable subscriber or they have expired.
533      * <p/>
534      * <P>Sessions with durable subscribers must always provide the same client
535      * identifier. In addition, each client must specify a name that uniquely
536      * identifies (within client identifier) each durable subscription it
537      * creates. Only one session at a time can have a <code>TopicSubscriber</code>
538      * for a particular durable subscription.
539      * <p/>
540      * <P>A client can change an existing durable subscription by creating a
541      * durable <code>TopicSubscriber</code> with the same name and a new topic
542      * and/or message selector. Changing a durable subscriber is equivalent to
543      * unsubscribing (deleting) the old one and creating a new one.
544      * <p/>
545      * <P>In some cases, a connection may both publish and subscribe to a topic.
546      * The subscriber <code>noLocal</code> attribute allows a subscriber to
547      * inhibit the delivery of messages published by its own connection. The
548      * default value for this attribute is false.
549      *
550      * @param topic the non-temporary <code>Topic</code> to subscribe to
551      * @param name  the name used to identify this subscription
552      * @throws JMSException                if the session fails to create a
553      *                                     subscriber due to some internal
554      *                                     error.
555      * @throws InvalidDestinationException if an invalid topic is specified.
556      */
557     public TopicSubscriber createDurableSubscriber(Topic topic, String name)
558             throws JMSException {
559         return createDurableSubscriber(topic, name, null, false);
560     }
561 
562     /***
563      * Creates a durable subscriber to the specified topic, using a message
564      * selector and specifying whether messages published by its own connection
565      * should be delivered to it.
566      * <p/>
567      * <P>If a client needs to receive all the messages published on a topic,
568      * including the ones published while the subscriber is inactive, it uses a
569      * durable <code>TopicSubscriber</code>. The JMS provider retains a record
570      * of this durable subscription and insures that all messages from the
571      * topic's publishers are retained until they are acknowledged by this
572      * durable subscriber or they have expired.
573      * <p/>
574      * <P>Sessions with durable subscribers must always provide the same client
575      * identifier. In addition, each client must specify a name which uniquely
576      * identifies (within client identifier) each durable subscription it
577      * creates. Only one session at a time can have a <code>TopicSubscriber</code>
578      * for a particular durable subscription. An inactive durable subscriber is
579      * one that exists but does not currently have a message consumer associated
580      * with it.
581      * <p/>
582      * <P>A client can change an existing durable subscription by creating a
583      * durable <code>TopicSubscriber</code> with the same name and a new topic
584      * and/or message selector. Changing a durable subscriber is equivalent to
585      * unsubscribing (deleting) the old one and creating a new one.
586      *
587      * @param topic           the non-temporary <code>Topic</code> to subscribe
588      *                        to
589      * @param name            the name used to identify this subscription
590      * @param messageSelector only messages with properties matching the message
591      *                        selector expression are delivered.  A value of
592      *                        null or an empty string indicates that there is no
593      *                        message selector for the message consumer.
594      * @param noLocal         if set, inhibits the delivery of messages
595      *                        published by its own connection
596      * @throws JMSException                if the session fails to create a
597      *                                     subscriber due to some internal
598      *                                     error.
599      * @throws InvalidDestinationException if an invalid topic is specified.
600      * @throws InvalidSelectorException    if the message selector is invalid.
601      */
602     public TopicSubscriber createDurableSubscriber(Topic topic, String name,
603                                                    String messageSelector,
604                                                    boolean noLocal)
605             throws JMSException {
606         ensureOpen();
607 
608         if (topic == null) {
609             throw new InvalidDestinationException("Cannot create durable subscriber: argument 'topic' is "
610                                                   + " null");
611         }
612         if (name == null || name.trim().length() == 0) {
613             throw new JMSException("Invalid subscription name specified");
614         }
615 
616         // check to see if the topic is a temporary topic. You cannot
617         // create a durable subscriber for a temporary topic
618         if (((JmsTopic) topic).isTemporaryDestination()) {
619             throw new InvalidDestinationException(
620                     "Cannot create a durable subscriber for a temporary topic");
621         }
622 
623         long consumerId = _session.createDurableConsumer((JmsTopic) topic, name,
624                                                          messageSelector,
625                                                          noLocal);
626         JmsTopicSubscriber subscriber = new JmsTopicSubscriber(this,
627                                                                consumerId,
628                                                                topic,
629                                                                messageSelector,
630                                                                noLocal);
631         addConsumer(subscriber);
632 
633         return subscriber;
634     }
635 
636     /***
637      * Creates a <code>QueueBrowser</code> object to peek at the messages on the
638      * specified queue.
639      *
640      * @param queue the queue to access
641      * @throws JMSException                if the session fails to create a
642      *                                     browser due to some internal error.
643      * @throws InvalidDestinationException if an invalid destination is
644      *                                     specified
645      */
646     public QueueBrowser createBrowser(Queue queue) throws JMSException {
647         return createBrowser(queue, null);
648     }
649 
650     /***
651      * Creates a <code>QueueBrowser</code> object to peek at the messages on the
652      * specified queue using a message selector.
653      *
654      * @param queue           the <code>queue</code> to access
655      * @param messageSelector only messages with properties matching the message
656      *                        selector expression are delivered. A value of null
657      *                        or an empty string indicates that there is no
658      *                        message selector for the message consumer.
659      * @throws JMSException                if the session fails to create a
660      *                                     browser due to some internal error.
661      * @throws InvalidDestinationException if an invalid destination is
662      *                                     specified
663      * @throws InvalidSelectorException    if the message selector is invalid.
664      */
665     public QueueBrowser createBrowser(Queue queue, String messageSelector)
666             throws JMSException {
667         ensureOpen();
668         if (!(queue instanceof JmsQueue)) {
669             throw new InvalidDestinationException("Cannot create QueueBrowser for destination="
670                                                   + queue);
671         }
672 
673         JmsQueue dest = (JmsQueue) queue;
674         // check to see if the queue is temporary. A temporary queue
675         // can only be used within the context of the owning connection
676         if (!checkForValidTemporaryDestination(dest)) {
677             throw new InvalidDestinationException(
678                     "Cannot create a queue browser for a temporary queue "
679                     + "that is not bound to this connection");
680         }
681 
682         long consumerId = _session.createBrowser(dest, messageSelector);
683         JmsQueueBrowser browser = new JmsQueueBrowser(this, consumerId, queue,
684                                                       messageSelector);
685         addConsumer(browser);
686         return browser;
687     }
688 
689     /***
690      * Creates a <code>TemporaryQueue</code> object. Its lifetime will be that
691      * of the <code>Connection</code> unless it is deleted earlier.
692      *
693      * @return a temporary queue identity
694      * @throws JMSException if the session fails to create a temporary queue due
695      *                      to some internal error.
696      */
697     public TemporaryQueue createTemporaryQueue() throws JMSException {
698         ensureOpen();
699         return JmsTemporaryQueue.create(getConnection());
700     }
701 
702     /***
703      * Creates a <code>TemporaryTopic</code> object. Its lifetime will be that
704      * of the <code>Connection</code> unless it is deleted earlier.
705      *
706      * @return a temporary topic identity
707      * @throws JMSException if the session fails to create a temporary topic due
708      *                      to some internal error.
709      */
710     public TemporaryTopic createTemporaryTopic() throws JMSException {
711         ensureOpen();
712         return JmsTemporaryTopic.create(getConnection());
713     }
714 
715     /***
716      * Unsubscribes a durable subscription that has been created by a client.
717      * <p/>
718      * <P>This method deletes the state being maintained on behalf of the
719      * subscriber by its provider.
720      * <p/>
721      * <P>It is erroneous for a client to delete a durable subscription while
722      * there is an active <code>MessageConsumer</code> or
723      * <code>TopicSubscriber</code> for the subscription, or while a consumed
724      * message is part of a pending transaction or has not been acknowledged in
725      * the session.
726      *
727      * @param name the name used to identify this subscription
728      * @throws JMSException                if the session fails to unsubscribe
729      *                                     to the durable subscription due to
730      *                                     some internal error.
731      * @throws InvalidDestinationException if an invalid subscription name is
732      *                                     specified.
733      */
734     public void unsubscribe(String name) throws JMSException {
735         ensureOpen();
736         _session.unsubscribe(name);
737     }
738 
739     /***
740      * Commit all messages done in this transaction
741      *
742      * @throws JMSException if the transaction cannot be committed
743      */
744     public void commit() throws JMSException {
745         ensureOpen();
746         ensureTransactional();
747 
748         // send all the cached messages to the server
749         getServerSession().send(_messagesToSend);
750         _messagesToSend.clear();
751 
752         // commit the session
753         getServerSession().commit();
754     }
755 
756     /***
757      * Rollback any messages done in this transaction
758      *
759      * @throws JMSException if the transaction cannot be rolled back
760      */
761     public void rollback() throws JMSException {
762         ensureOpen();
763         ensureTransactional();
764 
765         // clear all the cached messages
766         _messagesToSend.clear();
767 
768         // rollback the session
769         getServerSession().rollback();
770     }
771 
772     /***
773      * Close the session. This call will block until a receive or message
774      * listener in progress has completed. A blocked message consumer receive
775      * call returns <code>null</code> when this session is closed.
776      *
777      * @throws JMSException if the session can't be closed
778      */
779     public void close() throws JMSException {
780         boolean closing;
781         synchronized (_closeLock) {
782             closing = _closing;
783             _closing = true;
784         }
785         if (!closing) {
786             // must stop first to ensure any active listener completes
787             stop();
788 
789             // wake up any blocking consumer
790             synchronized (_receiveLock) {
791                 _receiveLock.notifyAll();
792             }
793 
794             _closed = true;
795 
796             // close the producers
797             JmsMessageProducer[] producers =
798                     (JmsMessageProducer[]) _producers.toArray(
799                             new JmsMessageProducer[0]);
800             for (int i = 0; i < producers.length; ++i) {
801                 JmsMessageProducer producer = producers[i];
802                 producer.close();
803             }
804 
805             // close the consumers
806             JmsMessageConsumer[] consumers =
807                     (JmsMessageConsumer[]) _consumers.values().toArray(
808                             new JmsMessageConsumer[0]);
809             for (int i = 0; i < consumers.length; ++i) {
810                 JmsMessageConsumer consumer = consumers[i];
811                 consumer.close();
812             }
813 
814             // deregister this with the connection
815             _connection.removeSession(this);
816             _connection = null;
817 
818             // clear any cached messages
819             _messagesToSend.clear();
820 
821             // issue a close to the remote session. This will release any
822             // allocated remote resources
823             getServerSession().close();
824             _session = null;
825         }
826     }
827 
828     /***
829      * Stop message delivery in this session, and restart sending messages with
830      * the oldest unacknowledged message
831      *
832      * @throws JMSException if the session can't be recovered
833      */
834     public void recover() throws JMSException {
835         ensureOpen();
836         if (getTransacted()) {
837             throw new IllegalStateException(
838                     "Cannot recover from a transacted session");
839         }
840 
841         getServerSession().recover();
842     }
843 
844     /***
845      * Returns the message listener associated with the session
846      *
847      * @return the message listener associated with the session, or
848      *         <code>null</code> if no listener is registered
849      * @throws JMSException if the session is closed
850      */
851     public MessageListener getMessageListener() throws JMSException {
852         ensureOpen();
853         return _listener;
854     }
855 
856     /***
857      * Sets the session's message listener.
858      *
859      * @param listener the session's message listener
860      * @throws JMSException if the session is closed
861      */
862     public void setMessageListener(MessageListener listener)
863             throws JMSException {
864         ensureOpen();
865         _listener = listener;
866     }
867 
868     /***
869      * Iterates through the list of messages added by an {@link
870      * JmsConnectionConsumer}, sending them to the registered listener
871      */
872     public void run() {
873         try {
874             while (!_messageCache.isEmpty()) {
875                 Message message = (Message) _messageCache.remove(0);
876                 _listener.onMessage(message);
877             }
878         } catch (Exception exception) {
879             _log.error("Error in the Session.run()", exception);
880         } finally {
881             // Clear message cache
882             _messageCache.clear();
883         }
884     }
885 
886     /***
887      * Set the message listener for a particular consumer.
888      * <p/>
889      * If a listener is already registered for the consumer, it will be
890      * automatically overwritten
891      *
892      * @param listener the message listener
893      * @throws JMSException if the listener can't be set
894      */
895     public void setMessageListener(JmsMessageConsumer listener)
896             throws JMSException {
897         ensureOpen();
898         setAsynchronous(listener.getConsumerId(), true);
899     }
900 
901     /***
902      * Remove a message listener
903      *
904      * @param listener the message listener to remove
905      * @throws JMSException if the listener can't be removed
906      */
907     public void removeMessageListener(JmsMessageConsumer listener)
908             throws JMSException {
909         ensureOpen();
910         setAsynchronous(listener.getConsumerId(), false);
911     }
912 
913     /***
914      * This will start message delivery to this session. If message delivery has
915      * already started, or the session is currently being closed then this is a
916      * no-op.
917      *
918      * @throws JMSException if message delivery can't be started
919      */
920     public void start() throws JMSException {
921         ensureOpen();
922         synchronized (_closeLock) {
923             if (_stopped && !_closing) {
924                 getServerSession().start();
925                 _stopped = false;
926             }
927         }
928     }
929 
930     /***
931      * This will stop message delivery to this session. If message delivery has
932      * already stoped then this is a no-op.
933      *
934      * @throws JMSException if message delivery can't be stopped
935      */
936     public void stop() throws JMSException {
937         ensureOpen();
938         synchronized (_closeLock) {
939             if (!_stopped) {
940                 getServerSession().stop();
941                 _stopped = true;
942             }
943         }
944     }
945 
946     /***
947      * Acknowledge the specified message. This is only applicable for
948      * CLIENT_ACKNOWLEDGE sessions. For other session types, the request is
949      * ignored.
950      * <p/>
951      * Acking a message automatically acks all those that have come before it.
952      *
953      * @param message the message to acknowledge
954      * @throws JMSException if the message can't be acknowledged
955      */
956     public void acknowledgeMessage(Message message) throws JMSException {
957         ensureOpen();
958         if (_ackMode == Session.CLIENT_ACKNOWLEDGE) {
959             MessageImpl impl = (MessageImpl) message;
960             getServerSession().acknowledgeMessage(impl.getConsumerId(),
961                                                   impl.getAckMessageID());
962         }
963     }
964 
965     /***
966      * Enable or disable asynchronous message delivery for the specified
967      * consumer.
968      *
969      * @param consumerId the consumer identifier
970      * @param enable     <code>true</code> to enable; <code>false</code> to
971      *                   disable
972      * @throws JMSException if message delivery cannot be enabled or disabled
973      */
974     public void setAsynchronous(long consumerId, boolean enable)
975             throws JMSException {
976         ensureOpen();
977         getServerSession().setAsynchronous(consumerId, enable);
978     }
979 
980     /***
981      * Deliver a message.
982      *
983      * @param message the message to deliver
984      * @return <code>true</code> if the message was delivered; otherwise
985      *         <code>false</code>.
986      */
987     public boolean onMessage(MessageImpl message) {
988         boolean delivered = false;
989         message.setJMSXRcvTimestamp(System.currentTimeMillis());
990 
991         long consumerId = message.getConsumerId();
992         JmsMessageConsumer consumer
993                 = (JmsMessageConsumer) _consumers.get(new Long(consumerId));
994         // tag the session that received this message
995         message.setSession(this);
996         if (consumer != null) {
997             // if a listener is defined for the session then send all the
998             // messages to that listener regardless if any consumers are
999             // have registered listeners...bit confusing but this is what
1000             // I believe it should do
1001             if (_listener != null) {
1002                 try {
1003                     _listener.onMessage(message);
1004                     delivered = true;
1005                 } catch (Throwable exception) {
1006                     _log.error("MessageListener threw exception", exception);
1007                 }
1008             } else {
1009                 delivered = consumer.onMessage(message);
1010             }
1011         } else {
1012             _log.error("Received a message for an inactive consumer");
1013         }
1014         return delivered;
1015     }
1016 
1017     /***
1018      * Inform the session that there is a message available for a synchronous
1019      * consumer.
1020      */
1021     public void onMessageAvailable() {
1022         // wake up any blocking consumer
1023         notifyConsumer();
1024     }
1025 
1026     /***
1027      * Receive the next message that arrives within the specified timeout
1028      * interval. This call blocks until a message arrives, the timeout expires,
1029      * or this message consumer is closed. A timeout of <code>0</code> never
1030      * expires and the call blocks indefinitely.
1031      *
1032      * @param consumerId the consumer identifier
1033      * @param timeout    the timeout interval, in milliseconds
1034      * @return the next message produced for the consumer, or <code>null</code>
1035      *         if the timeout expires or the consumer concurrently closed
1036      * @throws JMSException if the next message can't be received
1037      */
1038     public MessageImpl receive(long consumerId, long timeout)
1039             throws JMSException {
1040         MessageImpl message = null;
1041         ensureOpen();
1042 
1043         synchronized (_receiveLock) {
1044             if (_blockingConsumer != -1) {
1045                 throw new IllegalStateException(
1046                         "Session cannot be accessed concurrently");
1047             }
1048 
1049             _blockingConsumer = consumerId;
1050 
1051             long start = (timeout != 0) ? System.currentTimeMillis() : 0;
1052             try {
1053                 while (message == null && !isClosed()) {
1054                     if (timeout == 0) {
1055                         message = getServerSession().receive(consumerId, 0);
1056                     } else {
1057                         message = getServerSession().receive(consumerId,
1058                                                              timeout);
1059                     }
1060                     if (message == null && !isClosed()) {
1061                         // no message received in the required time.
1062                         // Wait for a notification from the server that
1063                         // a message has become available.
1064                         try {
1065                             if (timeout == 0) {
1066                                 _receiveLock.wait();
1067                             } else {
1068                                 long elapsed = System.currentTimeMillis()
1069                                         - start;
1070                                 if (elapsed >= timeout) {
1071                                     // no message received in the required time
1072                                     break;
1073                                 } else {
1074                                     // adjust the timeout so that the client
1075                                     // only waits as long as the original
1076                                     // timeout
1077                                     timeout -= elapsed;
1078                                 }
1079                                 _receiveLock.wait(timeout);
1080                             }
1081                         } catch (InterruptedException ignore) {
1082                             // no-op
1083                         }
1084                     }
1085                 }
1086 
1087                 if (message != null) {
1088                     message.setSession(this);
1089                     if (_ackMode == AUTO_ACKNOWLEDGE
1090                             || _ackMode == DUPS_OK_ACKNOWLEDGE) {
1091                         getServerSession().acknowledgeMessage(
1092                                 message.getConsumerId(),
1093                                 message.getMessageId().toString());
1094                     }
1095                 }
1096             } finally {
1097                 _blockingConsumer = -1;
1098             }
1099         }
1100         return message;
1101     }
1102 
1103     /***
1104      * Receive the next message if one is immediately available.
1105      *
1106      * @param consumerId the consumer identifier
1107      * @return the next message produced for this consumer, or <code>null</code>
1108      *         if one is not available
1109      * @throws JMSException if the next message can't be received
1110      */
1111     public MessageImpl receiveNoWait(long consumerId) throws JMSException {
1112         ensureOpen();
1113         MessageImpl message = getServerSession().receiveNoWait(consumerId);
1114         if (message != null) {
1115             message.setSession(this);
1116             if (_ackMode == AUTO_ACKNOWLEDGE
1117                     || _ackMode == DUPS_OK_ACKNOWLEDGE) {
1118                 getServerSession().acknowledgeMessage(
1119                         message.getConsumerId(),
1120                         message.getMessageId().toString());
1121             }
1122         }
1123         return message;
1124     }
1125 
1126     /***
1127      * Browse up to count messages.
1128      *
1129      * @param consumerId the consumer identifier
1130      * @param count      the maximum number of messages to receive
1131      * @return a list of {@link MessageImpl} instances
1132      * @throws JMSException for any JMS error
1133      */
1134     public List browse(long consumerId, int count)
1135             throws JMSException {
1136         ensureOpen();
1137         return getServerSession().browse(consumerId, count);
1138     }
1139 
1140     /***
1141      * Send the specified message to the server.
1142      *
1143      * @param message the message to send
1144      * @throws JMSException if the message can't be sent
1145      */
1146     protected void sendMessage(Message message) throws JMSException {
1147         if (getTransacted()) {
1148             // if the session is transacted then cache the message locally.
1149             // and wait for a commit or a rollback
1150             if (message instanceof MessageImpl) {
1151                 try {
1152                     message = (Message) ((MessageImpl) message).clone();
1153                 } catch (CloneNotSupportedException error) {
1154                     throw new JMSException(error.getMessage());
1155                 }
1156             } else {
1157                 message = convert(message);
1158             }
1159             _messagesToSend.add(message);
1160         } else {
1161             if (!(message instanceof MessageImpl)) {
1162                 message = convert(message);
1163             }
1164             getServerSession().send((MessageImpl) message);
1165         }
1166     }
1167 
1168     /***
1169      * Returns the server session.
1170      *
1171      * @return the server session
1172      */
1173     protected ServerSession getServerSession() {
1174         return _session;
1175     }
1176 
1177     /***
1178      * Return a reference to the connection that created this session.
1179      *
1180      * @return the owning connection
1181      */
1182     protected JmsConnection getConnection() {
1183         return _connection;
1184     }
1185 
1186     /***
1187      * Creates a new message consumer, returning its identity.
1188      *
1189      * @param destination the destination to access
1190      * @param selector    the message selector. May be <code>null</code>
1191      * @param noLocal     if true, and the destination is a topic, inhibits the
1192      *                    delivery of messages published by its own connection.
1193      *                    The behavior for <code>noLocal</code> is not specified
1194      *                    if the destination is a queue.
1195      * @throws JMSException                if the session fails to create a
1196      *                                     MessageConsumer due to some internal
1197      *                                     error.
1198      * @throws InvalidDestinationException if an invalid destination is
1199      *                                     specified.
1200      * @throws InvalidSelectorException    if the message selector is invalid.
1201      */
1202     protected long allocateConsumer(Destination destination,
1203                                     String selector, boolean noLocal)
1204             throws JMSException {
1205         ensureOpen();
1206 
1207         if (!(destination instanceof JmsDestination)) {
1208             throw new InvalidDestinationException("Cannot create MessageConsumer for destination="
1209                                                   + destination);
1210         }
1211         JmsDestination dest = (JmsDestination) destination;
1212 
1213         // check to see if the destination is temporary. A temporary destination
1214         // can only be used within the context of the owning connection
1215         if (!checkForValidTemporaryDestination(dest)) {
1216             throw new InvalidDestinationException(
1217                     "Trying to create a MessageConsumer for a temporary "
1218                     + "destination that is not bound to this connection");
1219         }
1220 
1221         return _session.createConsumer(dest, selector, noLocal);
1222     }
1223 
1224     /***
1225      * This method checks the destination. If the destination is not temporary
1226      * then return true. If it is a temporary destination and it is owned by
1227      * this session's connection then it returns true. If it is a tmeporary
1228      * destination and it is owned by another connection then it returns false
1229      *
1230      * @param destination the destination to check
1231      * @return <code>true</code> if the destination is valid
1232      */
1233     protected boolean checkForValidTemporaryDestination(
1234             JmsDestination destination) {
1235         boolean result = false;
1236 
1237         if (destination.isTemporaryDestination()) {
1238             JmsTemporaryDestination temp =
1239                     (JmsTemporaryDestination) destination;
1240 
1241             // check  that this temp destination is owned by the session's
1242             // connection.
1243             if (temp.validForConnection(getConnection())) {
1244                 result = true;
1245             }
1246         } else {
1247             result = true;
1248         }
1249 
1250         return result;
1251     }
1252 
1253     /***
1254      * Add a consumer to the list of consumers managed by this session.
1255      *
1256      * @param consumer the consumer to add
1257      */
1258     protected void addConsumer(JmsMessageConsumer consumer) {
1259         _consumers.put(new Long(consumer.getConsumerId()), consumer);
1260     }
1261 
1262     /***
1263      * Remove a consumer, deregistering it on the server.
1264      *
1265      * @param consumer the consumer to remove
1266      * @throws JMSException if removal fails
1267      */
1268     protected void removeConsumer(JmsMessageConsumer consumer)
1269             throws JMSException {
1270         long consumerId = consumer.getConsumerId();
1271         try {
1272             _session.closeConsumer(consumerId);
1273         } finally {
1274             _consumers.remove(new Long(consumerId));
1275         }
1276     }
1277 
1278     /***
1279      * Add a producer to the list of producers managed by this session.
1280      *
1281      * @param producer the producer to add
1282      */
1283     protected void addProducer(JmsMessageProducer producer) {
1284         _producers.add(producer);
1285     }
1286 
1287     /***
1288      * Remove the producer from the list of managed producers.
1289      *
1290      * @param producer the producer to remove
1291      */
1292     protected void removeProducer(JmsMessageProducer producer) {
1293         _producers.remove(producer);
1294     }
1295 
1296     /***
1297      * Check if the session is closed.
1298      *
1299      * @return <code>true</code> if the session is closed
1300      */
1301     protected final boolean isClosed() {
1302         return _closed;
1303     }
1304 
1305     /***
1306      * Add a message to the message cache. This message will be processed when
1307      * the run() method is called.
1308      *
1309      * @param message the message to add.
1310      */
1311     protected void addMessage(Message message) {
1312         _messageCache.add(message);
1313     }
1314 
1315     /***
1316      * Verifies that the session isn't closed.
1317      *
1318      * @throws IllegalStateException if the session is closed
1319      */
1320     protected void ensureOpen() throws IllegalStateException {
1321         if (isClosed()) {
1322             throw new IllegalStateException(
1323                     "Cannot perform operation - session has been closed");
1324         }
1325     }
1326 
1327     /***
1328      * Verifies that the session is transactional.
1329      *
1330      * @throws IllegalStateException if the session isn't transactional
1331      */
1332     private void ensureTransactional() throws IllegalStateException {
1333         if (_ackMode != SESSION_TRANSACTED) {
1334             throw new IllegalStateException(
1335                     "Cannot perform operatiorn - session is not transactional");
1336         }
1337     }
1338 
1339     /***
1340      * Notifies any blocking synchronous consumer.
1341      */
1342     private void notifyConsumer() {
1343         synchronized (_receiveLock) {
1344             _receiveLock.notifyAll();
1345         }
1346     }
1347 
1348     /***
1349      * Convert a message to its corresponding OpenJMS implementation.
1350      *
1351      * @param message the message to convert
1352      * @return the OpenJMS implementation of the message
1353      * @throws JMSException for any error
1354      */
1355     private Message convert(Message message) throws JMSException {
1356         MessageConverter converter =
1357                 MessageConverterFactory.create(message);
1358         return converter.convert(message);
1359     }
1360 
1361 }
1362